MySQL Binlog 解析工具 Maxwell
常用的MySQL Binlog解析工具主要有阿里的canal、mysql_streamer,三个工具对比如下:
maxwell 简介
Maxwell是一个能实时读取MySQL二进制日志binlog,并生成 JSON 格式的消息,作为生产者发送给 Kafka,Kinesis、RabbitMQ、Redis、Google Cloud Pub/Sub、文件或其它平台的应用程序。它的常见应用场景有ETL、维护缓存、收集表级别的dml指标、增量到搜索引擎、数据分区迁移、切库binlog回滚方案等。官网(http://maxwells-daemon.io)、GitHub(https://github.com/zendesk/maxwell)
Maxwell主要提供了下列功能:
- 支持
SELECT * FROM table
的方式进行全量数据初始化 - 支持在主库发生failover后,自动恢复binlog位置(GTID)
- 可以对数据进行分区,解决数据倾斜问题,发送到kafka的数据支持database、table、column等级别的数据分区
- 工作方式是伪装为Slave,接收binlog events,然后根据schemas信息拼装,可以接受ddl、xid、row等各种event
canal 由Java开发,分为服务端和客户端,拥有众多的衍生应用,性能稳定,功能强大;canal 需要自己编写客户端来消费canal解析到的数据。
maxwell相对于canal的优势是使用简单,它直接将数据变更输出为json字符串,不需要再编写客户端。
mysql启动binlog
1 | [mysqld] |
mysql创建用户
Maxwell用户,并赋予 maxwell 库的一些权限
1 | CREATE USER 'maxwell'@'%' IDENTIFIED BY '123456'; |
启动kafka集群
docker 快速安装并使用 Maxwell
1 | # 拉取镜像 |
测试Maxwell表
首先创建一张简单的表,然后增改删数据
1 | CREATE TABLE `test` ( |
观察docker控制台的输出,从输出的日志中可以看出Maxwell解析出的binlog的JSON字符串的格式
1 | {"database":"test","table":"test","type":"insert","ts":1552153502,"xid":832,"commit":true,"data":{"id":1,"age":22,"name":"小旋锋"}} |
输出到 Kafka,关闭 docker,重新设置启动参数
1 | docker run -it --rm zendesk/maxwell bin/maxwell --user='maxwell' \ |
然后启动一个消费者来消费 maxwell topic的消息,观察其输出;再一次执行增改删数据的SQL,仍然可以得到相同的输出
1 | bin/kafka-console-consumer.sh --bootstrap-server localhost:9092 --topic maxwell |
可以把Maxwell的启动参数写到一个配置文件 config.properties
中,然后通过 config 选项指定,bin/maxwell --config config.properties
1 | user=maxwell |
Maxwell 会将消息投递到Kafka的Topic中,该Topic由 kafka_topic
选项指定,默认值为 maxwell
,除了指定为静态的Topic,还可以指定为动态的,譬如 namespace_%{database}_%{table}
,%{database}
和 %{table}
将被具体的消息的 database 和 table 替换。
解析输出JSON字符串
具体见其他文档
参考地址